Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Octopus] Updated integration to ingest data from all spaces #958

Merged
merged 22 commits into from
Oct 10, 2024

Conversation

oiadebayo
Copy link
Member

Description

What - An update to octopus integration to enable it ingest data from all spaces

Why - The previous version ingest all spaces but not data from all spaces, api assumes default space when SpaceId is not provided as a path parameter.

How -

  • Updated client to loop over the spaces to add spaceId parameter to the path of all resources
  • Updated webhook handling to includespaceId in the path to get single resources.
  • Cached the spaces to optimize performance during the resync event.
  • Updated the delete event to use specific Id to prevent deleting related resources
  • Made the example url in description of serverUrl in the spec.yaml more expressive

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.
image
image

API Documentation

Provide links to the API documentation used for this integration.
Octopus Rest Api docs

@oiadebayo oiadebayo requested a review from PeyGis August 28, 2024 09:15
Comment on lines 74 to 75
async def get_paginated_resources(
self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

async def get_single_resource(
self, resource_kind: str, resource_id: str
self, resource_kind: str, resource_id: str, space_id: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you send it an empty string? as you do when you get event from a real time webhook

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will return 404, but the code has been slightly modified so it will always have a space_id when making this call. All events has the space_id present in the payload

Copy link
Contributor

@PeyGis PeyGis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requested some changes


### Improvements

- Updated integration to ingest all spaces
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the integration to ingest resources from all spaces instead of the default space

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

params: Optional[dict[str, Any]] = None,
kind: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have an optional kind parameter but I don't see where you are using it in this _get_paginated_function. Also may I know why you decided to create a private helper method for pagination instead of using the existing one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The kind is used in line 76 and it is no longer optional. I have joined the code for the paginator method together

self,
kind: str,
params: Optional[dict[str, Any]] = None,
path_parameter: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use of this path_parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space id goes in the path not the query parameter (params), it is a path parameter

"""Get all spaces in the Octopus instance."""
return await self._send_api_request("spaces/all")
async for spaces in self.get_paginated_resources("space"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the Objectkind.SPACE instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -77,21 +109,27 @@ async def handle_webhook_request(data: Dict[str, Any]) -> Dict[str, Any]:
payload = data.get("Payload", {}).get("Event", {})
related_document_ids = payload.get("RelatedDocumentIds", [])
event_category = payload.get("Category", "")
space_id = payload.get("SpaceId", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstance will a webhook event not have the SpaceId? If we don't get the space then we just return since we don't know which space this event is coming from. if you are sure that we will get spaceId then use

space_id = payload["SpaceId"]

Copy link
Member Author

@oiadebayo oiadebayo Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to space_id = payload["SpaceId"] it is always present

resource_id = (
payload.get("ChangeDetails", {}).get("DocumentContext", {}).get("Id")
)
if resource_id.split("-")[0].lower() in TRACKED_EVENTS:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resource_id may not exist so let's be defensive and check if it exist before splitting

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 87 to 94
async for spaces in octopus_client.get_all_spaces():
for space in spaces:
async for resource_batch in octopus_client.get_paginated_resources(
kind, path_parameter=space.get("Id")
):
logger.info(f"Received batch of {len(resource_batch)} {kind} ")
yield resource_batch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have nested pagination, we can use this helper function

from port_ocean.utils.async_iterators import stream_async_iterators_tasks

            tasks = [
                get paginated resource (kind, space.get("Id"))
                for space in spaces
            ]
            resource_data = []
            async for batch in stream_async_iterators_tasks(*tasks):
                yield batch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thank you for the insight

Comment on lines 130 to 131
logger.error(f"Unexpected error for space '{space_id}': {str(e)}")
return {"ok": True}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to return anything iiuc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed

Copy link
Contributor

@PeyGis PeyGis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some more sugestions

Comment on lines 54 to 56
existing_webhook_names = [
subscription.get("Name", "") for subscription in subscriptions
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we are checking for webhook name instead of using the URL?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to track both, unlike other webhooks I have worked with, name is required and must be unique when creating octopus subscription and the url is not required since the subscription can be an email subscription too. Tracking url let us know if that particular url has been previously created and tracking the name if the url does not exist only helps avoid making an api call that will definitely throw a 400

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This situation will only happen in rare cases whereby a customer decides to switch hosts, like maybe point a different subdomain at the integration

async for spaces in octopus_client.get_all_spaces():
for space in spaces:
async for subscriptions in octopus_client.get_webhook_subscriptions(
space.get("Id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may need to check if space.get("Id") is not None, so that it doesn't cause errors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 60 to 66
elif any(
name.startswith("Port Subscription -")
for name in existing_webhook_names
):
logger.info(
"Webhook already exists with name starting with 'Port Subscription -'"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if webhook names checking isn't important then let's remove this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 43 to 46
async for spaces in octopus_client.get_all_spaces():
for space in spaces:
async for subscriptions in octopus_client.get_webhook_subscriptions(
space.get("Id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see 3 nested loops here. This will make it slow during the application startup process, especially when they have huge spaces. Is it possible to run some in parallel or take advantage of the stream iterator helper function you used in the global resync?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@PeyGis PeyGis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor changes

async for spaces in octopus_client.get_all_spaces():
for space in spaces:
async for subscriptions in octopus_client.get_webhook_subscriptions(
space.get("Id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we be defensive and catch possible None type of space.get('iD') before passing it to get_webhook_subscriptions? I want us to avoid scenario where we pass None to the API

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)
else:
await octopus_client.create_webhook_subscription(
app_host, space.get("Id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since space can be None here, can we try to avoid sending API calls with None values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@PeyGis PeyGis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

integrations/octopus/main.py Outdated Show resolved Hide resolved
Copy link
Contributor

@omby8888 omby8888 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work 🌊

@PeyGis PeyGis merged commit 9fe159b into main Oct 10, 2024
16 checks passed
@PeyGis PeyGis deleted the Update-octopus-integration-to-ingest-all-spaces branch October 10, 2024 13:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants